1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.bootstrap.server; 12 13 import kiss.exception; 14 import kiss.logger; 15 import kiss.util.timer; 16 17 import collie.net; 18 import collie.channel; 19 import collie.bootstrap.serversslconfig; 20 import collie.bootstrap.exception; 21 22 import std.exception; 23 24 final class ServerBootstrap(PipeLine) 25 { 26 this() 27 { 28 _loop = new EventLoop(); 29 } 30 31 this(EventLoop loop) 32 { 33 _loop = loop; 34 } 35 36 auto pipeline(shared AcceptPipelineFactory factory) 37 { 38 _acceptorPipelineFactory = factory; 39 return this; 40 } 41 42 auto setSSLConfig(ServerSSLConfig config) 43 { 44 _sslConfig = config; 45 return this; 46 } 47 48 auto childPipeline(shared PipelineFactory!PipeLine factory) 49 { 50 _childPipelineFactory = factory; 51 return this; 52 } 53 54 auto group(EventLoopGroup group) 55 { 56 _group = group; 57 return this; 58 } 59 60 auto setReusePort(bool ruse) 61 { 62 _rusePort = ruse; 63 return this; 64 } 65 66 /** 67 The Value will be 0 or 5s ~ 1800s. 68 0 is disable, 69 if(value < 5) value = 5; 70 if(value > 3000) value = 1800; 71 */ 72 auto heartbeatTimeOut(uint second) 73 { 74 _timeOut = second; 75 _timeOut = _timeOut < 5 ? 5 : _timeOut; 76 _timeOut = _timeOut > 1800 ? 1800 : _timeOut; 77 78 return this; 79 } 80 81 void bind(Address addr) 82 { 83 _address = addr; 84 } 85 86 void bind(ushort port) 87 { 88 _address = new InternetAddress(port); 89 } 90 91 void bind(string ip, ushort port) 92 { 93 _address = new InternetAddress(ip, port); 94 } 95 96 void stopListening() 97 { 98 if (!_listening) 99 return; 100 scope (exit) 101 _listening = false; 102 foreach (ref accept; _serverlist) 103 { 104 accept.stop(); 105 } 106 _mainAccept.stop(); 107 108 } 109 110 void stop() 111 { 112 if (!_isLoopWait) 113 return; 114 scope (exit) 115 _isLoopWait = false; 116 _group.stop(); 117 _loop.stop(); 118 } 119 120 void join() 121 { 122 if (!_isLoopWait) 123 return; 124 if (_group) 125 _group.wait(); 126 } 127 128 void waitForStop() 129 { 130 if(_isLoopWait) 131 throw new ServerIsRuningException("server is runing!"); 132 if(!_listening) 133 startListening(); 134 _isLoopWait = true; 135 if(_group) 136 _group.start(); 137 _loop.run(); 138 } 139 140 void startListening() 141 { 142 if (_listening) 143 throw new ServerIsListeningException("server is listening!"); 144 if (_address is null || _childPipelineFactory is null) 145 throw new ServerStartException("the address or childPipelineFactory is null!"); 146 147 _listening = true; 148 uint wheel, time; 149 bool beat = getTimeWheelConfig(wheel, time); 150 _mainAccept = creatorAcceptor(_loop); 151 _mainAccept.initialize(); 152 if (beat) 153 _mainAccept.startTimingWhile(wheel, time); 154 if (_group) 155 { 156 foreach (loop; _group) 157 { 158 auto acceptor = creatorAcceptor(loop); 159 acceptor.initialize(); 160 _serverlist ~= acceptor; 161 if (beat) 162 acceptor.startTimingWhile(wheel, time); 163 } 164 } 165 logDebug("server _listening!"); 166 } 167 168 EventLoopGroup group() 169 { 170 return _group; 171 } 172 173 @property EventLoop eventLoop() 174 { 175 return _loop; 176 } 177 178 @property Address address() 179 { 180 return _address; 181 } 182 183 protected: 184 auto creatorAcceptor(EventLoop loop) 185 { 186 auto acceptor = new TcpListener(loop, _address.addressFamily); 187 if (_rusePort) 188 acceptor.reusePort = _rusePort; 189 acceptor.bind(_address); 190 acceptor.listen(1024); 191 { 192 Linger optLinger; 193 optLinger.on = 1; 194 optLinger.time = 0; 195 acceptor.setOption(SocketOptionLevel.SOCKET, SocketOption.LINGER, optLinger); 196 } 197 AcceptPipeline pipe; 198 if (_acceptorPipelineFactory) 199 pipe = _acceptorPipelineFactory.newPipeline(acceptor); 200 else 201 pipe = AcceptPipeline.create(); 202 203 SSL_CTX* ctx = null; 204 version (USE_SSL) 205 { 206 if (_sslConfig) 207 { 208 ctx = _sslConfig.generateSSLCtx(); 209 if (!ctx) 210 throw new SSLException("Can not gengrate SSL_CTX"); 211 } 212 } 213 214 return new ServerAcceptor!(PipeLine)(acceptor, pipe, _childPipelineFactory, ctx); 215 } 216 217 bool getTimeWheelConfig(out uint whileSize, out uint time) 218 { 219 if (_timeOut == 0) 220 return false; 221 if (_timeOut <= 40) 222 { 223 whileSize = 50; 224 time = _timeOut * 1000 / 50; 225 } 226 else if (_timeOut <= 120) 227 { 228 whileSize = 60; 229 time = _timeOut * 1000 / 60; 230 } 231 else if (_timeOut <= 600) 232 { 233 whileSize = 100; 234 time = _timeOut * 1000 / 100; 235 } 236 else if (_timeOut < 1000) 237 { 238 whileSize = 150; 239 time = _timeOut * 1000 / 150; 240 } 241 else 242 { 243 whileSize = 180; 244 time = _timeOut * 1000 / 180; 245 } 246 return true; 247 } 248 249 private: 250 shared AcceptPipelineFactory _acceptorPipelineFactory; 251 shared PipelineFactory!PipeLine _childPipelineFactory; 252 253 ServerAcceptor!(PipeLine) _mainAccept; 254 EventLoop _loop; 255 256 ServerAcceptor!(PipeLine)[] _serverlist; 257 EventLoopGroup _group; 258 259 bool _listening = false; 260 bool _rusePort = true; 261 bool _isLoopWait = false; 262 uint _timeOut = 0; 263 Address _address; 264 265 ServerSSLConfig _sslConfig = null; 266 } 267 268 private: 269 270 import std.functional; 271 import kiss.event.timer.common; 272 import collie.utils.memory; 273 import collie.net; 274 275 final class ServerAcceptor(PipeLine) : InboundHandler!(Socket) 276 { 277 this(TcpListener acceptor, AcceptPipeline pipe, 278 shared PipelineFactory!PipeLine clientPipeFactory, SSL_CTX* ctx = null) 279 { 280 _acceptor = acceptor; 281 _pipeFactory = clientPipeFactory; 282 pipe.addBack(this); 283 pipe.finalize(); 284 _pipe = pipe; 285 _pipe.transport(_acceptor); 286 _acceptor.onConnectionAccepted(&acceptCallBack); 287 _sslctx = ctx; 288 _list = new ServerConnection!PipeLine(); 289 version (USE_SSL) 290 _sharkList = new SSLHandShark(); 291 } 292 293 void initialize() 294 { 295 _pipe.transportActive(); 296 } 297 298 void stop() 299 { 300 _pipe.transportInactive(); 301 } 302 303 override void read(Context ctx, Socket msg) 304 { 305 version (USE_SSL) 306 { 307 if (_sslctx) 308 { 309 auto ssl = SSL_new(_sslctx); 310 static if (IOMode == IO_MODE.iocp) 311 { 312 BIO* readBIO = BIO_new(BIO_s_mem()); 313 BIO* writeBIO = BIO_new(BIO_s_mem()); 314 SSL_set_bio(ssl, readBIO, writeBIO); 315 SSL_set_accept_state(ssl); 316 auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl, readBIO, writeBIO); 317 } 318 else 319 { 320 if (SSL_set_fd(ssl, msg.handle()) < 0) 321 { 322 error("SSL_set_fd error: fd = ", msg.handle()); 323 SSL_shutdown(ssl); 324 SSL_free(ssl); 325 return; 326 } 327 SSL_set_accept_state(ssl); 328 auto asynssl = new SSLSocket(_acceptor.eventLoop, msg, ssl); 329 } 330 auto shark = new SSLHandShark(asynssl, &doHandShark); 331 332 shark.next = _sharkList.next; 333 if (shark.next) 334 shark.next.prev = shark; 335 shark.prev = _sharkList; 336 _sharkList.next = shark; 337 338 asynssl.start(); 339 } 340 else 341 { 342 auto asyntcp = new TcpStream(_acceptor.eventLoop, msg); 343 startSocket(asyntcp); 344 } 345 } 346 else 347 { 348 auto asyntcp = new TcpStream(_acceptor.eventLoop, msg); 349 startSocket(asyntcp); 350 } 351 } 352 353 override void transportActive(Context ctx) 354 { 355 logDebug("acept transportActive"); 356 try 357 { 358 _acceptor.start(); 359 } 360 catch (Exception) 361 { 362 logError("acceptor start error!"); 363 } 364 } 365 366 override void transportInactive(Context ctx) 367 { 368 _acceptor.close(); 369 auto con = _list.next; 370 _list.next = null; 371 while (con) 372 { 373 auto tcon = con; 374 con = con.next; 375 tcon.close(); 376 } 377 _acceptor.eventLoop.stop(); 378 } 379 380 protected: 381 pragma(inline) void remove(ServerConnection!PipeLine conn) 382 { 383 conn.prev.next = conn.next; 384 if (conn.next) 385 conn.next.prev = conn.prev; 386 gcFree(conn); 387 } 388 389 void acceptCallBack(TcpListener sender, TcpStream stream) 390 { 391 catchAndLogException(_pipe.read(stream)); 392 } 393 394 @property acceptor() 395 { 396 return _acceptor; 397 } 398 399 void startTimingWhile(uint whileSize, uint time) 400 { 401 if (_timer) 402 return; 403 _timer = new KissTimer(_acceptor.eventLoop, time); 404 _timer.onTick(&doWheel); 405 _wheel = new TimingWheel(whileSize); 406 _timer.start(); 407 } 408 409 void doWheel(Object) 410 { 411 if (_wheel) 412 _wheel.prevWheel(); 413 } 414 415 version (USE_SSL) 416 { 417 void doHandShark(SSLHandShark shark, SSLSocket sock) 418 { 419 shark.prev.next = shark.next; 420 if (shark.next) 421 shark.next.prev = shark.prev; 422 scope (exit) 423 shark.destroy(); 424 if (sock) 425 { 426 sock.setHandshakeCallBack(null); 427 startSocket(sock); 428 } 429 } 430 } 431 432 void startSocket(TcpStream sock) 433 { 434 auto pipe = _pipeFactory.newPipeline(sock); 435 if (!pipe) 436 { 437 gcFree(sock); 438 return; 439 } 440 pipe.finalize(); 441 auto con = new ServerConnection!PipeLine(pipe); 442 con.serverAceptor = this; 443 444 con.next = _list.next; 445 if (con.next) 446 con.next.prev = con; 447 con.prev = _list; 448 _list.next = con; 449 450 con.initialize(); 451 if (_wheel) 452 _wheel.addNewTimer(con); 453 } 454 455 private: 456 // int[ServerConnection!PipeLine] _list; 457 ServerConnection!PipeLine _list; 458 459 version (USE_SSL) 460 { 461 SSLHandShark _sharkList; 462 } 463 464 TcpListener _acceptor; 465 KissTimer _timer; 466 TimingWheel _wheel; 467 AcceptPipeline _pipe; 468 shared PipelineFactory!PipeLine _pipeFactory; 469 470 SSL_CTX* _sslctx = null; 471 } 472 473 @trusted final class ServerConnection(PipeLine) : WheelTimer, PipelineManager 474 { 475 this(PipeLine pipe) 476 { 477 _pipe = pipe; 478 _pipe.pipelineManager = this; 479 } 480 481 ~this() 482 { 483 } 484 485 void initialize() 486 { 487 _pipe.transportActive(); 488 } 489 490 void close() 491 { 492 _pipe.transportInactive(); 493 } 494 495 @property serverAceptor() 496 { 497 return _manger; 498 } 499 500 @property serverAceptor(ServerAcceptor!PipeLine manger) 501 { 502 _manger = manger; 503 } 504 505 override void deletePipeline(PipelineBase pipeline) 506 { 507 pipeline.pipelineManager = null; 508 _pipe = null; 509 stop(); 510 _manger.remove(this); 511 } 512 513 override void refreshTimeout() 514 { 515 rest(); 516 } 517 518 override void onTimeOut() nothrow 519 { 520 collectException(_pipe.timeOut()); 521 } 522 523 private: 524 this() 525 { 526 } 527 528 ServerConnection!PipeLine prev; 529 ServerConnection!PipeLine next; 530 ServerAcceptor!PipeLine _manger; 531 PipeLine _pipe; 532 } 533 534 version (USE_SSL) 535 { 536 final class SSLHandShark 537 { 538 alias SSLHandSharkCallBack = void delegate(SSLHandShark shark, SSLSocket sock); 539 this(SSLSocket sock, SSLHandSharkCallBack cback) 540 { 541 _socket = sock; 542 _cback = cback; 543 _socket.setCloseCallBack(&onClose); 544 _socket.setReadCallBack(&readCallBack); 545 _socket.setHandshakeCallBack(&handSharkCallBack); 546 } 547 548 protected: 549 void handSharkCallBack() 550 { 551 logDebug("the ssl handshark over"); 552 _cback(this, _socket); 553 _socket = null; 554 } 555 556 void readCallBack(ubyte[] buffer) 557 { 558 } 559 560 void onClose() 561 { 562 logDebug("the ssl handshark fail"); 563 _socket.setCloseCallBack(null); 564 _socket.setReadCallBack(null); 565 _socket.setHandshakeCallBack(null); 566 _socket = null; 567 _cback(this, _socket); 568 } 569 570 private: 571 this() 572 { 573 } 574 575 SSLHandShark prev; 576 SSLHandShark next; 577 SSLSocket _socket; 578 SSLHandSharkCallBack _cback; 579 } 580 }